1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.observables;
17  
18  import static org.junit.Assert.assertEquals;
19  import static org.junit.Assert.assertTrue;
20  import static org.junit.Assert.fail;
21  
22  import java.util.Iterator;
23  import java.util.NoSuchElementException;
24  import java.util.concurrent.CountDownLatch;
25  import java.util.concurrent.TimeUnit;
26  
27  import org.junit.Assert;
28  import org.junit.Before;
29  import org.junit.Test;
30  import org.mockito.Mock;
31  import org.mockito.MockitoAnnotations;
32  
33  import rx.Observable;
34  import rx.Observable.OnSubscribe;
35  import rx.Subscriber;
36  import rx.exceptions.TestException;
37  import rx.functions.Action0;
38  import rx.functions.Action1;
39  import rx.functions.Func1;
40  import rx.schedulers.Schedulers;
41  import rx.subscriptions.Subscriptions;
42  
43  public class BlockingObservableTest {
44  
45      @Mock
46      Subscriber<Integer> w;
47  
48      @Before
49      public void before() {
50          MockitoAnnotations.initMocks(this);
51      }
52  
53      @Test
54      public void testLast() {
55          BlockingObservable<String> obs = BlockingObservable.from(Observable.just("one", "two", "three"));
56  
57          assertEquals("three", obs.last());
58      }
59  
60      @Test(expected = NoSuchElementException.class)
61      public void testLastEmptyObservable() {
62          BlockingObservable<Object> obs = BlockingObservable.from(Observable.empty());
63          obs.last();
64      }
65  
66      @Test
67      public void testLastOrDefault() {
68          BlockingObservable<Integer> observable = BlockingObservable.from(Observable.just(1, 0, -1));
69          int last = observable.lastOrDefault(-100, new Func1<Integer, Boolean>() {
70              @Override
71              public Boolean call(Integer args) {
72                  return args >= 0;
73              }
74          });
75          assertEquals(0, last);
76      }
77  
78      @Test
79      public void testLastOrDefault1() {
80          BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
81          assertEquals("three", observable.lastOrDefault("default"));
82      }
83  
84      @Test
85      public void testLastOrDefault2() {
86          BlockingObservable<Object> observable = BlockingObservable.from(Observable.empty());
87          assertEquals("default", observable.lastOrDefault("default"));
88      }
89  
90      @Test
91      public void testLastOrDefaultWithPredicate() {
92          BlockingObservable<Integer> observable = BlockingObservable.from(Observable.just(1, 0, -1));
93          int last = observable.lastOrDefault(0, new Func1<Integer, Boolean>() {
94              @Override
95              public Boolean call(Integer args) {
96                  return args < 0;
97              }
98          });
99  
100         assertEquals(-1, last);
101     }
102 
103     @Test
104     public void testLastOrDefaultWrongPredicate() {
105         BlockingObservable<Integer> observable = BlockingObservable.from(Observable.just(-1, -2, -3));
106         int last = observable.lastOrDefault(0, new Func1<Integer, Boolean>() {
107             @Override
108             public Boolean call(Integer args) {
109                 return args >= 0;
110             }
111         });
112         assertEquals(0, last);
113     }
114 
115     @Test
116     public void testLastWithPredicate() {
117         BlockingObservable<String> obs = BlockingObservable.from(Observable.just("one", "two", "three"));
118         assertEquals("two", obs.last(new Func1<String, Boolean>() {
119             @Override
120             public Boolean call(String s) {
121                 return s.length() == 3;
122             }
123         }));
124     }
125 
126     @Test
127     public void testSingle() {
128         BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one"));
129         assertEquals("one", observable.single());
130     }
131 
132     @Test
133     public void testSingleDefault() {
134         BlockingObservable<Object> observable = BlockingObservable.from(Observable.empty());
135         assertEquals("default", observable.singleOrDefault("default"));
136     }
137 
138     @Test(expected = IllegalArgumentException.class)
139     public void testSingleDefaultPredicateMatchesMoreThanOne() {
140         BlockingObservable.from(Observable.just("one", "two")).singleOrDefault("default", new Func1<String, Boolean>() {
141             @Override
142             public Boolean call(String args) {
143                 return args.length() == 3;
144             }
145         });
146     }
147 
148     @Test
149     public void testSingleDefaultPredicateMatchesNothing() {
150         BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two"));
151         String result = observable.singleOrDefault("default", new Func1<String, Boolean>() {
152             @Override
153             public Boolean call(String args) {
154                 return args.length() == 4;
155             }
156         });
157         assertEquals("default", result);
158     }
159 
160     @Test(expected = IllegalArgumentException.class)
161     public void testSingleDefaultWithMoreThanOne() {
162         BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
163         observable.singleOrDefault("default");
164     }
165 
166     @Test
167     public void testSingleWithPredicateDefault() {
168         BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "four"));
169         assertEquals("four", observable.single(new Func1<String, Boolean>() {
170             @Override
171             public Boolean call(String s) {
172                 return s.length() == 4;
173             }
174         }));
175     }
176 
177     @Test(expected = IllegalArgumentException.class)
178     public void testSingleWrong() {
179         BlockingObservable<Integer> observable = BlockingObservable.from(Observable.just(1, 2));
180         observable.single();
181     }
182 
183     @Test(expected = NoSuchElementException.class)
184     public void testSingleWrongPredicate() {
185         BlockingObservable<Integer> observable = BlockingObservable.from(Observable.just(-1));
186         observable.single(new Func1<Integer, Boolean>() {
187             @Override
188             public Boolean call(Integer args) {
189                 return args > 0;
190             }
191         });
192     }
193 
194     @Test
195     public void testToIterable() {
196         BlockingObservable<String> obs = BlockingObservable.from(Observable.just("one", "two", "three"));
197 
198         Iterator<String> it = obs.toIterable().iterator();
199 
200         assertEquals(true, it.hasNext());
201         assertEquals("one", it.next());
202 
203         assertEquals(true, it.hasNext());
204         assertEquals("two", it.next());
205 
206         assertEquals(true, it.hasNext());
207         assertEquals("three", it.next());
208 
209         assertEquals(false, it.hasNext());
210 
211     }
212 
213     @Test(expected = NoSuchElementException.class)
214     public void testToIterableNextOnly() {
215         BlockingObservable<Integer> obs = BlockingObservable.from(Observable.just(1, 2, 3));
216 
217         Iterator<Integer> it = obs.toIterable().iterator();
218 
219         Assert.assertEquals((Integer) 1, it.next());
220         Assert.assertEquals((Integer) 2, it.next());
221         Assert.assertEquals((Integer) 3, it.next());
222 
223         it.next();
224     }
225 
226     @Test(expected = NoSuchElementException.class)
227     public void testToIterableNextOnlyTwice() {
228         BlockingObservable<Integer> obs = BlockingObservable.from(Observable.just(1, 2, 3));
229 
230         Iterator<Integer> it = obs.toIterable().iterator();
231 
232         Assert.assertEquals((Integer) 1, it.next());
233         Assert.assertEquals((Integer) 2, it.next());
234         Assert.assertEquals((Integer) 3, it.next());
235 
236         boolean exc = false;
237         try {
238             it.next();
239         } catch (NoSuchElementException ex) {
240             exc = true;
241         }
242         Assert.assertEquals(true, exc);
243 
244         it.next();
245     }
246 
247     @Test
248     public void testToIterableManyTimes() {
249         BlockingObservable<Integer> obs = BlockingObservable.from(Observable.just(1, 2, 3));
250 
251         Iterable<Integer> iter = obs.toIterable();
252 
253         for (int j = 0; j < 3; j++) {
254             Iterator<Integer> it = iter.iterator();
255 
256             Assert.assertTrue(it.hasNext());
257             Assert.assertEquals((Integer) 1, it.next());
258             Assert.assertTrue(it.hasNext());
259             Assert.assertEquals((Integer) 2, it.next());
260             Assert.assertTrue(it.hasNext());
261             Assert.assertEquals((Integer) 3, it.next());
262             Assert.assertFalse(it.hasNext());
263         }
264     }
265 
266     @Test(expected = TestException.class)
267     public void testToIterableWithException() {
268         BlockingObservable<String> obs = BlockingObservable.from(Observable.create(new Observable.OnSubscribe<String>() {
269 
270             @Override
271             public void call(Subscriber<? super String> observer) {
272                 observer.onNext("one");
273                 observer.onError(new TestException());
274             }
275         }));
276 
277         Iterator<String> it = obs.toIterable().iterator();
278 
279         assertEquals(true, it.hasNext());
280         assertEquals("one", it.next());
281 
282         assertEquals(true, it.hasNext());
283         it.next();
284 
285     }
286 
287     @Test
288     public void testForEachWithError() {
289         try {
290             BlockingObservable.from(Observable.create(new Observable.OnSubscribe<String>() {
291 
292                 @Override
293                 public void call(final Subscriber<? super String> observer) {
294                     new Thread(new Runnable() {
295 
296                         @Override
297                         public void run() {
298                             observer.onNext("one");
299                             observer.onNext("two");
300                             observer.onNext("three");
301                             observer.onCompleted();
302                         }
303                     }).start();
304                 }
305             })).forEach(new Action1<String>() {
306 
307                 @Override
308                 public void call(String t1) {
309                     throw new RuntimeException("fail");
310                 }
311             });
312             fail("we expect an exception to be thrown");
313         } catch (Throwable e) {
314             // do nothing as we expect this
315         }
316     }
317 
318     @Test
319     public void testFirst() {
320         BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
321         assertEquals("one", observable.first());
322     }
323 
324     @Test(expected = NoSuchElementException.class)
325     public void testFirstWithEmpty() {
326         BlockingObservable.from(Observable.<String> empty()).first();
327     }
328 
329     @Test
330     public void testFirstWithPredicate() {
331         BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
332         String first = observable.first(new Func1<String, Boolean>() {
333             @Override
334             public Boolean call(String args) {
335                 return args.length() > 3;
336             }
337         });
338         assertEquals("three", first);
339     }
340 
341     @Test(expected = NoSuchElementException.class)
342     public void testFirstWithPredicateAndEmpty() {
343         BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
344         observable.first(new Func1<String, Boolean>() {
345             @Override
346             public Boolean call(String args) {
347                 return args.length() > 5;
348             }
349         });
350     }
351 
352     @Test
353     public void testFirstOrDefault() {
354         BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
355         assertEquals("one", observable.firstOrDefault("default"));
356     }
357 
358     @Test
359     public void testFirstOrDefaultWithEmpty() {
360         BlockingObservable<String> observable = BlockingObservable.from(Observable.<String> empty());
361         assertEquals("default", observable.firstOrDefault("default"));
362     }
363 
364     @Test
365     public void testFirstOrDefaultWithPredicate() {
366         BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
367         String first = observable.firstOrDefault("default", new Func1<String, Boolean>() {
368             @Override
369             public Boolean call(String args) {
370                 return args.length() > 3;
371             }
372         });
373         assertEquals("three", first);
374     }
375 
376     @Test
377     public void testFirstOrDefaultWithPredicateAndEmpty() {
378         BlockingObservable<String> observable = BlockingObservable.from(Observable.just("one", "two", "three"));
379         String first = observable.firstOrDefault("default", new Func1<String, Boolean>() {
380             @Override
381             public Boolean call(String args) {
382                 return args.length() > 5;
383             }
384         });
385         assertEquals("default", first);
386     }
387 
388     @Test
389     public void testSingleOrDefaultUnsubscribe() throws InterruptedException {
390         final CountDownLatch unsubscribe = new CountDownLatch(1);
391         Observable<Integer> o = Observable.create(new OnSubscribe<Integer>() {
392             @Override
393             public void call(Subscriber<? super Integer> subscriber) {
394                 subscriber.add(Subscriptions.create(new Action0() {
395                     @Override
396                     public void call() {
397                         unsubscribe.countDown();
398                     }
399                 }));
400                 subscriber.onNext(1);
401                 subscriber.onNext(2);
402                 // Don't call `onCompleted` to emulate an infinite stream
403             }
404         }).subscribeOn(Schedulers.newThread());
405         try {
406             o.toBlocking().singleOrDefault(-1);
407             fail("Expected IllegalArgumentException because there are 2 elements");
408         } catch (IllegalArgumentException e) {
409             // Expected
410         }
411         assertTrue("Timeout means `unsubscribe` is not called", unsubscribe.await(30, TimeUnit.SECONDS));
412     }
413 
414     @Test
415     public void testUnsubscribeFromSingleWhenInterrupted() throws InterruptedException {
416         new InterruptionTests().assertUnsubscribeIsInvoked("single()", new Action1<BlockingObservable<Void>>() {
417             @Override
418             public void call(final BlockingObservable<Void> o) {
419                 o.single();
420             }
421         });
422     }
423 
424     @Test
425     public void testUnsubscribeFromForEachWhenInterrupted() throws InterruptedException {
426         new InterruptionTests().assertUnsubscribeIsInvoked("forEach()", new Action1<BlockingObservable<Void>>() {
427             @Override
428             public void call(final BlockingObservable<Void> o) {
429                 o.forEach(new Action1<Void>() {
430                     @Override
431                     public void call(final Void aVoid) {
432                         // nothing
433                     }
434                 });
435             }
436         });
437     }
438 
439     @Test
440     public void testUnsubscribeFromFirstWhenInterrupted() throws InterruptedException {
441         new InterruptionTests().assertUnsubscribeIsInvoked("first()", new Action1<BlockingObservable<Void>>() {
442             @Override
443             public void call(final BlockingObservable<Void> o) {
444                 o.first();
445             }
446         });
447     }
448 
449     @Test
450     public void testUnsubscribeFromLastWhenInterrupted() throws InterruptedException {
451         new InterruptionTests().assertUnsubscribeIsInvoked("last()", new Action1<BlockingObservable<Void>>() {
452             @Override
453             public void call(final BlockingObservable<Void> o) {
454                 o.last();
455             }
456         });
457     }
458 
459     @Test
460     public void testUnsubscribeFromLatestWhenInterrupted() throws InterruptedException {
461         new InterruptionTests().assertUnsubscribeIsInvoked("latest()", new Action1<BlockingObservable<Void>>() {
462             @Override
463             public void call(final BlockingObservable<Void> o) {
464                 o.latest().iterator().next();
465             }
466         });
467     }
468 
469     @Test
470     public void testUnsubscribeFromNextWhenInterrupted() throws InterruptedException {
471         new InterruptionTests().assertUnsubscribeIsInvoked("next()", new Action1<BlockingObservable<Void>>() {
472             @Override
473             public void call(final BlockingObservable<Void> o) {
474                 o.next().iterator().next();
475             }
476         });
477     }
478 
479     @Test
480     public void testUnsubscribeFromGetIteratorWhenInterrupted() throws InterruptedException {
481         new InterruptionTests().assertUnsubscribeIsInvoked("getIterator()", new Action1<BlockingObservable<Void>>() {
482             @Override
483             public void call(final BlockingObservable<Void> o) {
484                 o.getIterator().next();
485             }
486         });
487     }
488 
489     @Test
490     public void testUnsubscribeFromToIterableWhenInterrupted() throws InterruptedException {
491         new InterruptionTests().assertUnsubscribeIsInvoked("toIterable()", new Action1<BlockingObservable<Void>>() {
492             @Override
493             public void call(final BlockingObservable<Void> o) {
494                 o.toIterable().iterator().next();
495             }
496         });
497     }
498 
499     /** Utilities set for interruption behaviour tests. */
500     private static class InterruptionTests {
501 
502         private boolean isUnSubscribed;
503         private RuntimeException error;
504         private CountDownLatch latch = new CountDownLatch(1);
505 
506         private Observable<Void> createObservable() {
507             return Observable.<Void>never().doOnUnsubscribe(new Action0() {
508                 @Override
509                 public void call() {
510                     isUnSubscribed = true;
511                 }
512             });
513         }
514 
515         private void startBlockingAndInterrupt(final Action1<BlockingObservable<Void>> blockingAction) {
516             Thread subscriptionThread = new Thread() {
517                 @Override
518                 public void run() {
519                     try {
520                         blockingAction.call(createObservable().toBlocking());
521                     } catch (RuntimeException e) {
522                         if (!(e.getCause() instanceof InterruptedException)) {
523                             error = e;
524                         }
525                     }
526                     latch.countDown();
527                 }
528             };
529             subscriptionThread.start();
530             subscriptionThread.interrupt();
531         }
532 
533         void assertUnsubscribeIsInvoked(final String method, final Action1<BlockingObservable<Void>> blockingAction)
534             throws InterruptedException {
535             startBlockingAndInterrupt(blockingAction);
536             assertTrue("Timeout means interruption is not performed", latch.await(30, TimeUnit.SECONDS));
537             if (error != null) {
538                 throw error;
539             }
540             assertTrue("'unsubscribe' is not invoked when thread is interrupted for " + method, isUnSubscribed);
541         }
542 
543     }
544 
545 }